package com.atguigu.app.dws;

import com.atguigu.app.func.SplitFunction;
import com.atguigu.bean.KeywordBean;
import com.atguigu.utils.KafkaUtil;
import com.atguigu.utils.MyClickHouseUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

//数据流：web/app -> 日志服务器(log文件) -> flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> ClickHouse(DWS)
//程  序：Mock -> 文件 -> f1.sh -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK) -> DwsTrafficKeywordPageViewWindow -> ClickHouse(ZK)
public class Dws01TrafficKeywordPageViewWindow {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔5秒钟做一次CK  ,并指定CK的一致性语义
        //env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        // 2.2 设置超时时间为 1 分钟
        //env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        // 2.3 设置两次重启的最小时间间隔
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        // 2.5 指定从 CK 自动重启策略
        //env.setRestartStrategy(RestartStrategies.failureRateRestart(
        //        3, Time.days(1L), Time.minutes(1L)
        //));
        // 2.6 设置状态后端
        //env.setStateBackend(new EmbeddedRocksDBStateBackend(true) );
        //env.getCheckpointConfig().setCheckpointStorage(
        //      "hdfs://hadoop102:8020/flinkCDC"
        //);
        // 2.7 设置访问HDFS的用户名
        //System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.使用FlinkSQL方式读取页面日志主题数据创建动态表  注意提取事件时间
        String topic = "dwd_traffic_page_log";
        String groupId = "keyword_page_view_220718";
        tableEnv.executeSql("" +
                "create table page_log( " +
                "    `common` MAP<STRING,STRING>, " +
                "    `page` MAP<STRING,STRING>, " +
                "    `ts` BIGINT, " +
                "    `rt` as TO_TIMESTAMP_LTZ(ts, 3), " +
                "    WATERMARK FOR rt AS rt - INTERVAL '2' SECOND " +
                ")" + KafkaUtil.getKafkaDDL(topic, groupId));

        //TODO 3.过滤出需要的搜索数据
        Table filterTable = tableEnv.sqlQuery("" +
                "select " +
                "    page['item'] item, " +
                "    rt " +
                "from page_log " +
                "where page['item'] is not null " +
                "and page['item_type'] = 'keyword' " +
                "and page['last_page_id'] = 'search'");
        tableEnv.createTemporaryView("filter_table", filterTable);

        //TODO 4.注册UDTF并使用其进行分词处理
        tableEnv.createTemporaryFunction("splitFunc", SplitFunction.class);
        Table splitTable = tableEnv.sqlQuery("" +
                "select " +
                "    word, " +
                "    rt " +
                "from filter_table,LATERAL TABLE(splitFunc(item))");
        tableEnv.createTemporaryView("split_table", splitTable);

        //TODO 5.分组、开窗、聚合
        Table resultTable = tableEnv.sqlQuery("" +
                "select " +
                "    DATE_FORMAT(TUMBLE_START(rt, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') stt, " +
                "    DATE_FORMAT(TUMBLE_END(rt, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') edt, " +
                "    word keyword, " +
                "    count(*) keyword_count, " +
                "    UNIX_TIMESTAMP() ts " +
                "from split_table " +
                "group by word, " +
                "TUMBLE(rt, INTERVAL '10' SECOND)");

        //resultTable.execute().print();

        //TODO 6.将动态表转换为流
        DataStream<KeywordBean> keywordDS = tableEnv.toAppendStream(resultTable, KeywordBean.class);

        //TODO 7.将数据写出到ClickHouse
        keywordDS.print(">>>>>>>");
        keywordDS.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_traffic_keyword_page_view_window values(?,?,?,?,?)"));

        //TODO 8.启动任务
        env.execute("DwsTrafficKeywordPageViewWindow");

    }

}
